#include <glib.h>
#include <glib-object.h>
#include <gio/gio.h>

/**
 * MessageQueueSource:
 *
 * This is a #GSource which wraps a #GAsyncQueue and is dispatched whenever a
 * message can be pulled off the queue. Messages can be enqueued from any
 * thread.
 *
 * The callbacks dispatched by a #MessageQueueSource have type
 * #MessageQueueSourceFunc.
 *
 * #MessageQueueSource supports adding a #GCancellable child source which will
 * additionally dispatch if a provided #GCancellable is cancelled.
 */
typedef struct {
    GSource         parent;
    GAsyncQueue    *queue;  /* owned */
    GDestroyNotify  destroy_message;
} MessageQueueSource;

/**
 * MessageQueueSourceFunc:
 * @message: (transfer full) (nullable): message pulled off the queue
 * @user_data: user data provided to g_source_set_callback()
 *
 * Callback function type for #MessageQueueSource.
 */
typedef gboolean (*MessageQueueSourceFunc) (gpointer message,
                                            gpointer user_data);

static gboolean
message_queue_source_prepare (GSource *source,
                              gint    *timeout_) {
    MessageQueueSource *message_queue_source = (MessageQueueSource *) source;

    return (g_async_queue_length (message_queue_source->queue) > 0);
}

static gboolean
message_queue_source_dispatch (GSource     *source,
                               GSourceFunc  callback,
                               gpointer     user_data) {
    MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
    gpointer message;
    MessageQueueSourceFunc func = (MessageQueueSourceFunc) callback;

    /* Pop a message off the queue. */
    message = g_async_queue_try_pop (message_queue_source->queue);

    /* If there was no message, bail. */
    if (message == NULL) {
        /* Keep the source around to handle the next message. */
        return TRUE;
    }

    /* @func may be %NULL if no callback was specified.
     * If so, drop the message. */
    if (func == NULL) {
        if (message_queue_source->destroy_message != NULL) {
            message_queue_source->destroy_message (message);
        }

        /* Keep the source around to consume the next message. */
        return TRUE;
    }

    return func (message, user_data);
}

static void
message_queue_source_finalize (GSource *source) {
    MessageQueueSource *message_queue_source = (MessageQueueSource *) source;

    g_async_queue_unref (message_queue_source->queue);
}

static gboolean
message_queue_source_closure_callback (gpointer message,
                                       gpointer user_data) {
    GClosure *closure = user_data;
    GValue param_value = G_VALUE_INIT;
    GValue result_value = G_VALUE_INIT;
    gboolean retval;

    /* The invoked function is responsible for freeing @message. */
    g_value_init (&result_value, G_TYPE_BOOLEAN);
    g_value_init (&param_value, G_TYPE_POINTER);
    g_value_set_pointer (&param_value, message);

    g_closure_invoke (closure, &result_value, 1, &param_value, NULL);
    retval = g_value_get_boolean (&result_value);

    g_value_unset (&param_value);
    g_value_unset (&result_value);

    return retval;
}

static GSourceFuncs message_queue_source_funcs = {
    message_queue_source_prepare,
    NULL,  /* check */
    message_queue_source_dispatch,
    message_queue_source_finalize,
    (GSourceFunc) message_queue_source_closure_callback,
    NULL,
};

/**
 * message_queue_source_new:
 * @queue: the queue to check
 * @destroy_message: (nullable): function to free a message, or %NULL
 * @cancellable: (nullable): a #GCancellable, or %NULL
 *
 * Create a new #MessageQueueSource, a type of #GSource which dispatches for
 * each message queued to it.
 *
 * If a callback function of type #MessageQueueSourceFunc is connected to the
 * returned #GSource using g_source_set_callback(), it will be invoked for each
 * message, with the message passed as its first argument. It is responsible for
 * freeing the message. If no callback is set, messages are automatically freed
 * as they are queued.
 *
 * Returns: (transfer full): a new #MessageQueueSource
 */
GSource *
message_queue_source_new (GAsyncQueue    *queue,
                          GDestroyNotify  destroy_message,
                          GCancellable   *cancellable) {
    GSource *source;  /* alias of @message_queue_source */
    MessageQueueSource *message_queue_source;  /* alias of @source */

    g_return_val_if_fail (queue != NULL, NULL);
    g_return_val_if_fail (cancellable == NULL ||
                          G_IS_CANCELLABLE (cancellable), NULL);

    source = g_source_new (&message_queue_source_funcs,
                           sizeof (MessageQueueSource));
    message_queue_source = (MessageQueueSource *) source;

    /* The caller can overwrite this name with something more useful later. */
    g_source_set_name (source, "MessageQueueSource");

    message_queue_source->queue = g_async_queue_ref (queue);
    message_queue_source->destroy_message = destroy_message;

    /* Add a cancellable source. */
    if (cancellable != NULL){
        GSource *cancellable_source;

        cancellable_source = g_cancellable_source_new (cancellable);
        g_source_set_dummy_callback (cancellable_source);
        g_source_add_child_source (source, cancellable_source);
        g_source_unref (cancellable_source);
    }

    return source;
}


/* Utility functions. */
//source callback
static gboolean source_cb (gpointer message, gpointer user_data) {
    guint *seen_i = user_data;

    g_assert_cmpuint (GPOINTER_TO_UINT (message), ==, *seen_i);
    *seen_i = *seen_i + 1;

    return TRUE;
}

static guint global_destroy_calls = 0;

//message destroy callback
static void destroy_cb (gpointer user_data) {
    global_destroy_calls++;
}

/* Tests. */
// 构建测试对象
static void test_construction (void) {
    GSource *source = NULL;
    GAsyncQueue *queue = NULL;
    
    queue = g_async_queue_new ();
    source = message_queue_source_new (queue, NULL, NULL);
    g_source_unref (source);
    g_async_queue_unref (queue);
}

static void
test_messages_with_callback (void) {
    GSource *source;
    GAsyncQueue *queue;
    guint i, seen_i;
    GMainContext *main_context;
  
    queue = g_async_queue_new_full (destroy_cb);
    source = message_queue_source_new (queue, destroy_cb, NULL);
  
    /* Enqueue some messages. */
    for (i = 1; i <= 100; i++){
        g_async_queue_push (queue, GUINT_TO_POINTER (i));
    }
  
    /* Create and run a main loop. Expect to see the messages in order. */
    seen_i = 1;
    global_destroy_calls = 0;
  
    main_context = g_main_context_new ();
    g_source_set_callback (source, (GSourceFunc) source_cb, &seen_i, NULL);
    g_source_attach (source, main_context);
  
    while (g_main_context_iteration (main_context, FALSE));
  
    g_assert_cmpuint (global_destroy_calls, ==, 0);
    g_assert_cmpuint (seen_i, ==, 101);
    g_assert_cmpuint (g_async_queue_length (queue), ==, 0);
  
    g_main_context_unref (main_context);
    g_async_queue_unref (queue);
    g_source_unref (source);
}

static void
test_messages_without_callback (void) {
    GSource *source;
    GAsyncQueue *queue;
    guint i;
    GMainContext *main_context;

    queue = g_async_queue_new_full (destroy_cb);
    source = message_queue_source_new (queue, destroy_cb, NULL);

    /* Enqueue some messages. */
    for (i = 1; i <= 100; i++) {
        g_async_queue_push (queue, GUINT_TO_POINTER (i));
    }

    /* Create and run a main loop. */
    main_context = g_main_context_new ();
    g_source_attach (source, main_context);

    while (g_main_context_iteration (main_context, FALSE));

    g_assert_cmpuint (global_destroy_calls, ==, 100);
    g_assert_cmpuint (g_async_queue_length (queue), ==, 0);

    g_main_context_unref (main_context);
    g_async_queue_unref (queue);
    g_source_unref (source);
}

int main (int argc, char **argv) {
    g_test_init (&argc, &argv, NULL);

    g_test_add_func ("/custom-gsource/construction", test_construction);
    g_test_add_func ("/custom-gsource/messages-with-callback",
                     test_messages_with_callback);
    g_test_add_func ("/custom-gsource/messages-without-callback",
                     test_messages_without_callback);

    return g_test_run ();
}
